/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.queue.resource;

import com.chinamobile.cmss.lakehouse.common.dto.ResourceCheckResult;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.ConfigQueue;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.ConfigResources;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.QueuesCapacity;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.YunikornConfig;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.YunikornConfig.ConfigPartition;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.YunikornQueue;
import com.chinamobile.cmss.lakehouse.common.dto.yunikorn.YunikornQueue.QueueCapacity;
import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.core.handler.K8sDeployHandler;
import com.chinamobile.cmss.lakehouse.core.handler.K8sUriHandler;
import com.chinamobile.cmss.lakehouse.core.queue.ResourceManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
@Slf4j
public class YunikornResourceManager implements ResourceManager {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private volatile boolean firstCallCompleted = false;

    @Autowired
    private K8sUriHandler k8sUriHandler;

    @Autowired
    protected K8sDeployHandler deployHandler;

    @Override
    public boolean runnable(String queueName, int cpu, int memory) {
        YunikornQueue.QueueCapacity capacity = getQueueCapacity(queueName);
        String maxStr = capacity.getMaxcapacity();
        // remove '[' and ']'
        String[] max = maxStr.substring(1, maxStr.length() - 1).split(" ");
        assert max.length == 2 : "The length of Array(max) should be 2.";
        int maxMemory = Integer.parseInt(max[0].split(":")[1]) / 1024;
        int maxCpu = Integer.parseInt(max[1].split(":")[1]) / 1000;

        String usedStr = capacity.getUsedcapacity();
        int usedMemory = 0;
        int usedCpu = 0;
        if (!"[]".equals(usedStr)) {
            String[] used = usedStr.substring(1, usedStr.length() - 1).split(" ");
            assert used.length == 2 : "The length of Array(used) should be 2.";
            usedMemory = Integer.parseInt(used[0].split(":")[1]) / 1024;
            usedCpu = Integer.parseInt(used[1].split(":")[1]) / 1000;
        }
        int surplusMemory = maxMemory - usedMemory;
        int surplusCpu = maxCpu - usedCpu;
        if (cpu <= surplusCpu && memory <= surplusMemory) {
            return true;
        }
        return false;
    }

    @Override
    public void createQueue(ConfigQueue configQueue) {
        YunikornConfig yunikornConfig = getYunikornConfig();
        // get yunikorn config

        // check if queue list has the same queue
        List<ConfigPartition> partitions = yunikornConfig.getPartitions();
        if (CollectionUtils.isEmpty(partitions)) {
            throw new BaseException("Yunikorn partition is empty!");
        }
        ConfigPartition partition = partitions.get(0);
        if (partition == null) {
            throw new BaseException("Yunikorn partition is null!");
        }
        List<ConfigQueue> queueList = partition.getQueues();
        if (CollectionUtils.isEmpty(queueList)) {
            throw new BaseException("Yunikorn queue list ie empty!");
        }
        ConfigQueue rootQueue = queueList.get(0);
        if (rootQueue == null) {
            throw new BaseException("Yunikorn root queue is null!");
        }
        List<ConfigQueue> rootQueueList = rootQueue.getQueues();
        if (rootQueueList == null) {
            rootQueueList = new ArrayList<>();
            rootQueue.setQueues(rootQueueList);
        }
        for (ConfigQueue queue : rootQueueList) {
            if (queue.getName().equals(configQueue.getName())) {
                throw new BaseException("Yunikorn queue list has the same name queue: " + queue.getName());
            }
        }
        rootQueueList.add(configQueue);
        checkQueueCapacity(yunikornConfig);
        log.info("start to add queue: {}", configQueue);
        editYunikornConfig(yunikornConfig);
        // refresh configuration after first call
        refreshConfigurationAfterFirstCall(yunikornConfig, configQueue);
    }

    private void refreshConfigurationAfterFirstCall(YunikornConfig yunikornConfig, ConfigQueue configQueue) {
        if (!firstCallCompleted) {
            YunikornConfig currentConfig = getYunikornConfig();
            List<String> otherQueues = currentConfig.getPartitions().get(0).getQueues().get(0).getQueues()
                .stream().map(ConfigQueue::getName)
                .filter(name -> !"lakehouse".equals(name))
                .filter(name -> !configQueue.getName().equals(name))
                .collect(Collectors.toList());
            // the first call has only two queues (i.e. lakehouse and itself)
            if (otherQueues.size() == 0) {
                // Note, changes made to the configmap might have some delay to be picked up by the scheduler.
                // As a workaround, wait until scheduler are ready to pick up the configmap changes, then call secondly
                int waitMillis = 100 * 1000;
                log.info("Wait {} milliseconds for scheduler ready.", waitMillis);
                try {
                    Thread.sleep(waitMillis);
                } catch (Exception e) {
                    log.info("Failed to execute thread sleep!", e);
                }
                log.info("This is the second call with config: {}", yunikornConfig);
                editYunikornConfig(yunikornConfig);
                firstCallCompleted = true;
            }
        }
    }

    @Override
    public void deleteQueue(String queueName) {
        YunikornConfig yunikornConfig = getYunikornConfig();
        // get yunikorn config
        // check if queue list has the same queue
        List<ConfigPartition> partitions = yunikornConfig.getPartitions();
        if (CollectionUtils.isEmpty(partitions)) {
            throw new BaseException("Yunikorn partition is empty!");
        }
        ConfigPartition partition = partitions.get(0);
        if (partition == null) {
            throw new BaseException("Yunikorn partition is null!");
        }
        List<ConfigQueue> queueList = partition.getQueues();
        if (CollectionUtils.isEmpty(queueList)) {
            throw new BaseException("Yunikorn queue list ie empty!");
        }
        ConfigQueue rootQueue = queueList.get(0);
        if (rootQueue == null) {
            throw new BaseException("Yunikorn root queue is null!");
        }
        List<ConfigQueue> rootQueueList = rootQueue.getQueues();
        if (CollectionUtils.isEmpty(rootQueueList)) {
            throw new BaseException("Yunikorn root queue list is empty!");
        }
        ConfigQueue deleteQueue = null;
        for (ConfigQueue queue : rootQueueList) {
            if (queue.getName().equals(queueName)) {
                deleteQueue = queue;
            }
        }
        if (deleteQueue == null) {
            throw new BaseException("Cannot find the queue with the same name.");
        }
        rootQueueList.remove(deleteQueue);
        editYunikornConfig(yunikornConfig);
    }

    @Override
    public void editConfigQueue(String queueName, ConfigResources configResources) {
        YunikornConfig yunikornConfig = getYunikornConfig();
        // get yunikorn config
        // check if queue list has the same queue
        List<ConfigPartition> partitions = yunikornConfig.getPartitions();
        if (CollectionUtils.isEmpty(partitions)) {
            throw new BaseException("Yunikorn partition is empty!");
        }
        ConfigPartition partition = partitions.get(0);
        if (partition == null) {
            throw new BaseException("Yunikorn partition is null!");
        }
        List<ConfigQueue> queueList = partition.getQueues();
        if (CollectionUtils.isEmpty(queueList)) {
            throw new BaseException("Yunikorn queue list ie empty!");
        }
        ConfigQueue rootQueue = queueList.get(0);
        if (rootQueue == null) {
            throw new BaseException("Yunikorn root queue is null!");
        }
        List<ConfigQueue> rootQueueList = rootQueue.getQueues();
        if (CollectionUtils.isEmpty(rootQueueList)) {
            throw new BaseException("Yunikorn root queue list is empty!");
        }
        ConfigQueue editQueue = null;
        for (ConfigQueue queue : rootQueueList) {
            if (queue.getName().equals(queueName)) {
                editQueue = queue;
            }
        }
        if (editQueue == null) {
            throw new BaseException("Cannot find the queue with the same name: " + queueName);
        }
        editQueue.setResources(configResources);
        //
        checkQueueCapacity(yunikornConfig);
        editYunikornConfig(yunikornConfig);
    }

    @Override
    public ResourceCheckResult checkEditQueue(String queueName, ConfigResources configResources) {
        try {
            YunikornConfig yunikornConfig = getYunikornConfig();
            // get yunikorn config
            // check if queue list has the same queue
            List<ConfigPartition> partitions = yunikornConfig.getPartitions();
            if (CollectionUtils.isEmpty(partitions)) {
                throw new BaseException("Yunikorn partition is empty!");
            }
            ConfigPartition partition = partitions.get(0);
            if (partition == null) {
                throw new BaseException("Yunikorn partition is null!");
            }
            List<ConfigQueue> queueList = partition.getQueues();
            if (CollectionUtils.isEmpty(queueList)) {
                throw new BaseException("Yunikorn queue list ie empty!");
            }
            ConfigQueue rootQueue = queueList.get(0);
            if (rootQueue == null) {
                throw new BaseException("Yunikorn root queue is null!");
            }
            List<ConfigQueue> rootQueueList = rootQueue.getQueues();
            if (CollectionUtils.isEmpty(rootQueueList)) {
                throw new BaseException("Yunikorn root queue list is empty!");
            }
            ConfigQueue editQueue = null;
            for (ConfigQueue queue : rootQueueList) {
                if (queue.getName().equals(queueName)) {
                    editQueue = queue;
                }
            }
            if (editQueue == null) {
                throw new BaseException("Cannot find the queue with the same name: " + queueName);
            }
            editQueue.setResources(configResources);
            //
            checkQueueCapacity(yunikornConfig);
            return new ResourceCheckResult(null, true);
        } catch (Exception ex) {
            log.info("Cannot edit the queue: {}", ex.getMessage());
            return new ResourceCheckResult(ex.getMessage(), false);
        }
    }

    @Override
    public ResourceCheckResult checkCreateQueue(ConfigQueue configQueue) {
        try {
            YunikornConfig yunikornConfig = getYunikornConfig();
            // get yunikorn config

            // check if queue list has the same queue
            List<ConfigPartition> partitions = yunikornConfig.getPartitions();
            if (CollectionUtils.isEmpty(partitions)) {
                throw new BaseException("Yunikorn partition is empty!");
            }
            ConfigPartition partition = partitions.get(0);
            if (partition == null) {
                throw new BaseException("Yunikorn partition is null!");
            }
            List<ConfigQueue> queueList = partition.getQueues();
            if (CollectionUtils.isEmpty(queueList)) {
                throw new BaseException("Yunikorn queue list ie empty!");
            }
            ConfigQueue rootQueue = queueList.get(0);
            if (rootQueue == null) {
                throw new BaseException("Yunikorn root queue is null!");
            }
            List<ConfigQueue> rootQueueList = rootQueue.getQueues();
            if (rootQueueList == null) {
                rootQueueList = new ArrayList<>();
                rootQueue.setQueues(rootQueueList);
            }
            for (ConfigQueue queue : rootQueueList) {
                if (queue.getName().equals(configQueue.getName())) {
                    throw new BaseException(
                        "Yunikorn queue list has the same name queue: " + queue.getName());
                }
            }
            rootQueueList.add(configQueue);
            checkQueueCapacity(yunikornConfig);
            return new ResourceCheckResult(null, true);
        } catch (Exception ex) {
            log.info("Cannot create the queue: {}", ex.getMessage());
            return new ResourceCheckResult(ex.getMessage(), false);
        }
    }

    @Override
    public boolean existsQueueName(String queueName) {
        try {
            YunikornConfig yunikornConfig = getYunikornConfig();
            // get yunikorn config
            // check if queue list has the same queue
            List<ConfigPartition> partitions = yunikornConfig.getPartitions();
            if (CollectionUtils.isEmpty(partitions)) {
                throw new BaseException("Yunikorn partition is empty!");
            }
            ConfigPartition partition = partitions.get(0);
            if (partition == null) {
                throw new BaseException("Yunikorn partition is null!");
            }
            List<ConfigQueue> queueList = partition.getQueues();
            if (CollectionUtils.isEmpty(queueList)) {
                throw new BaseException("Yunikorn queue list ie empty!");
            }
            ConfigQueue rootQueue = queueList.get(0);
            if (rootQueue == null) {
                throw new BaseException("Yunikorn root queue is null!");
            }
            List<ConfigQueue> rootQueueList = rootQueue.getQueues();
            if (CollectionUtils.isEmpty(rootQueueList)) {
                throw new BaseException("Yunikorn root queue list is empty!");
            }
            ConfigQueue editQueue = null;
            for (ConfigQueue queue : rootQueueList) {
                if (queue.getName().equals(queueName)) {
                    editQueue = queue;
                }
            }
            if (editQueue == null) {
                throw new BaseException("Cannot find the queue with the same name: " + queueName);
            }
            return true;
        } catch (Exception ex) {
            log.info("Cannot find the queue: {}", ex.getMessage());
        }
        return false;
    }

    @Override
    public QueuesCapacity getQueuesCapacity() {
        return getQueues();
    }

    private QueueCapacity getQueueCapacity(String queueName) {
        QueuesCapacity capacity = getQueues();
        YunikornQueue root = capacity.getQueues();
        Optional<YunikornQueue> yQueue = root.getQueues().stream()
            .filter(queue -> queueName.equals(queue.getQueuename())).findFirst();
        if (!yQueue.isPresent()) {
            throw new RuntimeException(String.format("Queue:%s not found!", queueName));
        }
        return yQueue.get().getCapacities();
    }

    private YunikornConfig getYunikornConfig() {
        String currentConfig = getConfig();
        String checksum = getChecksum(currentConfig);
        // set checksum back to actual value
        Gson gson = new Gson();
        YunikornConfig yunikornConfig = gson.fromJson(currentConfig.toLowerCase(), YunikornConfig.class);
        yunikornConfig.setChecksum(checksum);
        log.info("Current config is: {}", yunikornConfig);
        return yunikornConfig;
    }

    private String getChecksum(String config) {
        // get actual checksum value
        JsonNode jsonNode = null;
        try {
            jsonNode = objectMapper.readTree(config);
        } catch (JsonProcessingException e) {
            log.error("Failed to read current config", e);
        }
        if (jsonNode == null) {
            throw new BaseException("Cannot read config tree!");
        }
        return jsonNode.get("Checksum").textValue();
    }

    private void editYunikornConfig(YunikornConfig yunikornConfig) {
        lock.writeLock().lock();
        try {
            Gson gson = new Gson();
            String newYunikornConfig = gson.toJson(yunikornConfig);
            String result = editConfig(newYunikornConfig);
            String successMsg = "Configuration updated successfully";
            if (successMsg.equals(result)) {
                log.info("edit yunikorn config successfully");
            } else {
                throw new BaseException("Cannot write config to yunikorn: " + result);
            }
        } finally {
            lock.writeLock().unlock();
        }
    }

    protected void checkQueueCapacity(YunikornConfig yunikornConfig) {
        QueuesCapacity capacity = getQueues();
        if (capacity == null) {
            throw new BaseException("Cannot find queues.");
        }
        YunikornQueue root = capacity.getQueues();
        if (root == null) {
            throw new BaseException("Cannot find root queue.");
        }
        QueueCapacity rootCapacities = root.getCapacities();
        if (rootCapacities == null) {
            throw new BaseException("Cannot find root queue capacity.");
        }

        String maxStr = rootCapacities.getMaxcapacity();
        // remove '[' and ']'
        String[] max = maxStr.substring(1, maxStr.length() - 1).split(" ");
        int maxMemory = 0;
        int maxCpu = 0;
        for (String detail : max) {
            if (detail.contains("vcore")) {
                maxCpu = Integer.parseInt(detail.split(":")[1]);
            }
            if (detail.contains("memory")) {
                maxMemory = Integer.parseInt(detail.split(":")[1]);
            }
        }

        int allocateMemory = 0;
        int allocateCpu = 0;
        if (yunikornConfig == null) {
            throw new BaseException("Yunikorn config is null!");
        }
        List<ConfigPartition> partitions = yunikornConfig.getPartitions();
        if (partitions == null || partitions.size() == 0) {
            throw new BaseException("Yunikorn partition is empty!");
        }
        ConfigPartition partition = partitions.get(0);
        if (partition == null) {
            throw new BaseException("Yunikorn partition is null!");
        }
        List<ConfigQueue> queueList = partition.getQueues();
        if (queueList == null || queueList.size() == 0) {
            throw new BaseException("Yunikorn queue list is empty!");
        }
        ConfigQueue rootQueue = queueList.get(0);
        if (rootQueue == null) {
            throw new BaseException("Yunikorn root queue is null!");
        }
        List<ConfigQueue> rootQueueList = rootQueue.getQueues();
        if (rootQueueList != null && rootQueueList.size() != 0) {
            for (ConfigQueue configQueue : rootQueueList) {
                ConfigResources resources = configQueue.getResources();
                if (resources != null) {
                    if (resources.getMax() != null || resources.getGuaranteed() != null) {
                        ConfigResources.ConfigResource resource =
                            resources.getMax() != null ? resources.getMax() : resources.getGuaranteed();
                        if (resource.getMemory() != null && resource.getMemory().length() > 0) {
                            int queueMemory = Integer.parseInt(resource.getMemory());
                            int queueCpu = Integer.parseInt(resource.getVcore());
                            allocateMemory += queueMemory;
                            allocateCpu += queueCpu;
                        }

                    }
                }
            }
        }
        if (allocateCpu > maxCpu || allocateMemory > maxMemory) {
            throw new BaseException(
                "Allocate more memory than root, allocateCpu: " + allocateCpu + " , allocateMemory: "
                    + allocateMemory + ", rootCpu: " + maxCpu + ", rootMemory: " + maxMemory);

        }
    }

    private QueuesCapacity getQueues() {
        String url = k8sUriHandler.getYunikornQueuesUrl();
        try {
            String response = get(url);
            return objectMapper.readValue(response, QueuesCapacity.class);
        } catch (IOException e) {
            log.error("Failed to get queues!", e);
        }
        return null;
    }

    /**
     * Queues name are always lowercase since pattern of k8s namespace is'[a-z0-9]([-a-z0-9]*[a-z0-9])?'
     *
     * @return lowercase config json data
     */
    private String getConfig() {
        String url = k8sUriHandler.getYunikornConfigUrl();
        try {
            return get(url);
        } catch (IOException e) {
            log.error("Failed to get config!", e);
        }
        return null;
    }

    private String editConfig(String yunikornConfig) {
        String url = k8sUriHandler.getYunikornConfigUrl();
        try {
            return put(url, yunikornConfig);
        } catch (IOException e) {
            log.error("Failed to put config!", e);
        }
        return null;
    }

    private String put(String url, String data) throws IOException {
        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
            HttpPut httpPut = new HttpPut(url);
            httpPut.setEntity(new StringEntity(data, ContentType.APPLICATION_JSON));
            CloseableHttpResponse response = httpClient.execute(httpPut);
            String resp;
            try {
                HttpEntity entity = response.getEntity();
                resp = EntityUtils.toString(entity, "utf-8");
                EntityUtils.consume(entity);
            } finally {
                response.close();
            }
            log.info("Yunikorn send [{}], param:{}, resp:{}", url, data, resp);
            return resp;
        }
    }

    private String get(String url) throws IOException {
        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
            HttpGet httpGet = new HttpGet(url);
            httpGet.setHeader("Accept", "application/json");
            try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
                HttpEntity entity = response.getEntity();
                return EntityUtils.toString(entity, "utf-8");
            }
        }
    }
}
